home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """Provides access to PolicyKit privilege mangement using gdefer Deferreds."""
- # Copyright (C) 2008-2009 Sebastian Heinlein <devel@glatzor.de>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
- __author__ = "Sebastian Heinlein <devel@glatzor.de>"
-
- import os
-
- import dbus
-
- from defer import Deferred, defer
-
- PK_ACTION_REMOVE_PACKAGES = "org.debian.apt.remove-packages"
- PK_ACTION_INSTALL_PACKAGES = "org.debian.apt.install-packages"
- PK_ACTION_INSTALL_FILE = "org.debian.apt.install-file"
- PK_ACTION_UPGRADE_PACKAGES = "org.debian.apt.upgrade-packages"
- PK_ACTION_UPDATE_CACHE = "org.debian.apt.update-cache"
- PK_ACTION_UPGRADE_SYSTEM = "org.debian.apt.upgrade-system"
- PK_ACTION_CANCEL_FOREIGN = "org.debian.apt.cancel-foreign"
- PK_ACTION_ADD_VENDOR_KEY = "org.debian.apt.add-vendor-key"
- PK_ACTION_GET_TRUSTED_VENDOR_KEYS = "org.debian.apt.get-trusted-vendor-keys"
- PK_ACTION_REMOVE_VENDOR_KEY = "org.debian.apt.remove-vendor-key"
- PK_ACTION_CHANGE_REPOSITORY = "org.debian.apt.change-repository"
-
- CHECK_AUTH_NONE = 0
- CHECK_AUTH_ALLOW_USER_INTERACTION = 1
-
-
- class NotAuthorizedError(dbus.DBusException):
-
- def __init__(self, subject, action_id):
- message = "%s is not authorized: %s" % (subject, action_id)
- dbus.DBusException.__init__(self, message,
- name="org.freedesktop.PolicyKit.Error."
- "NotAuthorized")
- self.action_id = action_id
- self.subject = subject
-
- def check_authorization_by_name(dbus_name, action_id, timeout=300, bus=None):
- """Check if the given sender is authorized for the specified action.
-
- If the sender is not authorized raise NotAuthorizedError.
-
- Keyword arguments:
- dbus_name -- D-Bus name of the subject
- action_id -- the PolicyKit policy name of the action
- timeout -- time in seconds for the user to authenticate
- bus -- the D-Bus connection (defaults to the system bus)
- """
- subject = ("system-bus-name", {"name": dbus_name})
- return _check_authorization(subject, action_id, timeout, bus)
-
- def check_authorization_by_pid(pid, action_id, timeout=300, bus=None):
- """Check if the given process is authorized for the specified action.
-
- If the sender is not authorized raise NotAuthorizedError.
-
- Keyword arguments:
- pid -- id of the process
- action_id -- the PolicyKit policy name of the action
- timeout -- time in seconds for the user to authenticate
- bus -- the D-Bus connection (defaults to the system bus)
- """
- subject = ("unix-process", {"pid": pid})
- return _check_authorization(subject, action_id, timeout, bus)
-
- def _check_authorization(subject, action_id, timeout, bus):
- def policykit_done((authorized, challenged, auth_details)):
- if authorized:
- deferred.callback(auth_details)
- else:
- deferred.errback(NotAuthorizedError(subject, action_id))
- if not bus:
- bus = dbus.SystemBus()
- deferred = Deferred()
- pk = bus.get_object("org.freedesktop.PolicyKit1",
- "/org/freedesktop/PolicyKit1/Authority")
- details = {}
- pk.CheckAuthorization(subject, action_id, details,
- CHECK_AUTH_ALLOW_USER_INTERACTION, "",
- dbus_interface="org.freedesktop.PolicyKit1.Authority",
- timeout=timeout,
- reply_handler=policykit_done,
- error_handler=deferred.errback)
- return deferred
-
- def get_pid_from_dbus_name(dbus_name, bus=None):
- """Return a deferred that gets the id of process owning the given
- system D-Bus name.
- """
- if not bus:
- bus = dbus.SystemBus()
- deferred = Deferred()
- bus_obj = bus.get_object("org.freedesktop.DBus",
- "/org/freedesktop/DBus/Bus")
- bus_obj.GetConnectionUnixProcessID(dbus_name,
- dbus_interface="org.freedesktop.DBus",
- reply_handler=deferred.callback,
- error_handler=deferred.errback)
- return deferred
-
- def get_uid_from_dbus_name(dbus_name, bus=None):
- """Return a deferred that gets the uid of the user owning the given
- system D-Bus name.
- """
- if not bus:
- bus = dbus.SystemBus()
- deferred = get_pid_from_dbus_name(dbus_name)
- deferred.add_callback(_get_uid_from_pid)
- return deferred
-
- def _get_uid_from_pid(pid):
- """Return the uid of the process."""
- proc = open("/proc/%s/status" % pid)
- values = [v for v in proc.readlines() if v.startswith("Uid:")]
- proc.close()
- uid = int(values[0].split()[1])
- return uid
-
-
- # vim:ts=4:sw=4:et
-